1   package org.apache.lucene.replicator;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements.  See the NOTICE file distributed with
6    * this work for additional information regarding copyright ownership.
7    * The ASF licenses this file to You under the Apache License, Version 2.0
8    * (the "License"); you may not use this file except in compliance with
9    * the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  import java.io.ByteArrayOutputStream;
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.PrintStream;
24  import java.nio.file.Path;
25  import java.util.HashMap;
26  import java.util.concurrent.Callable;
27  import java.util.concurrent.atomic.AtomicBoolean;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  import org.apache.lucene.document.Document;
31  import org.apache.lucene.facet.DrillDownQuery;
32  import org.apache.lucene.facet.FacetField;
33  import org.apache.lucene.facet.Facets;
34  import org.apache.lucene.facet.FacetsCollector;
35  import org.apache.lucene.facet.FacetsConfig;
36  import org.apache.lucene.facet.taxonomy.FastTaxonomyFacetCounts;
37  import org.apache.lucene.facet.taxonomy.TaxonomyReader;
38  import org.apache.lucene.facet.taxonomy.TaxonomyWriter;
39  import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyReader;
40  import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter;
41  import org.apache.lucene.index.CheckIndex;
42  import org.apache.lucene.index.DirectoryReader;
43  import org.apache.lucene.index.IndexWriter;
44  import org.apache.lucene.index.IndexWriterConfig;
45  import org.apache.lucene.index.SegmentInfos;
46  import org.apache.lucene.index.SnapshotDeletionPolicy;
47  import org.apache.lucene.replicator.IndexAndTaxonomyRevision.SnapshotDirectoryTaxonomyWriter;
48  import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler;
49  import org.apache.lucene.replicator.ReplicationClient.SourceDirectoryFactory;
50  import org.apache.lucene.search.IndexSearcher;
51  import org.apache.lucene.search.MatchAllDocsQuery;
52  import org.apache.lucene.search.TopDocs;
53  import org.apache.lucene.store.Directory;
54  import org.apache.lucene.store.MockDirectoryWrapper;
55  import org.apache.lucene.util.IOUtils;
56  import org.apache.lucene.util.TestUtil;
57  import org.apache.lucene.util.ThreadInterruptedException;
58  import org.junit.After;
59  import org.junit.Before;
60  import org.junit.Test;
61  
62  public class IndexAndTaxonomyReplicationClientTest extends ReplicatorTestCase {
63    
64    private static class IndexAndTaxonomyReadyCallback implements Callable<Boolean>, Closeable {
65      
66      private final Directory indexDir, taxoDir;
67      private DirectoryReader indexReader;
68      private DirectoryTaxonomyReader taxoReader;
69      private FacetsConfig config;
70      private long lastIndexGeneration = -1;
71      
72      public IndexAndTaxonomyReadyCallback(Directory indexDir, Directory taxoDir) throws IOException {
73        this.indexDir = indexDir;
74        this.taxoDir = taxoDir;
75        config = new FacetsConfig();
76        config.setHierarchical("A", true);
77        if (DirectoryReader.indexExists(indexDir)) {
78          indexReader = DirectoryReader.open(indexDir);
79          lastIndexGeneration = indexReader.getIndexCommit().getGeneration();
80          taxoReader = new DirectoryTaxonomyReader(taxoDir);
81        }
82      }
83      
84      @Override
85      public Boolean call() throws Exception {
86        if (indexReader == null) {
87          indexReader = DirectoryReader.open(indexDir);
88          lastIndexGeneration = indexReader.getIndexCommit().getGeneration();
89          taxoReader = new DirectoryTaxonomyReader(taxoDir);
90        } else {
91          // verify search index
92          DirectoryReader newReader = DirectoryReader.openIfChanged(indexReader);
93          assertNotNull("should not have reached here if no changes were made to the index", newReader);
94          long newGeneration = newReader.getIndexCommit().getGeneration();
95          assertTrue("expected newer generation; current=" + lastIndexGeneration + " new=" + newGeneration, newGeneration > lastIndexGeneration);
96          indexReader.close();
97          indexReader = newReader;
98          lastIndexGeneration = newGeneration;
99          TestUtil.checkIndex(indexDir);
100         
101         // verify taxonomy index
102         DirectoryTaxonomyReader newTaxoReader = TaxonomyReader.openIfChanged(taxoReader);
103         if (newTaxoReader != null) {
104           taxoReader.close();
105           taxoReader = newTaxoReader;
106         }
107         TestUtil.checkIndex(taxoDir);
108         
109         // verify faceted search
110         int id = Integer.parseInt(indexReader.getIndexCommit().getUserData().get(VERSION_ID), 16);
111         IndexSearcher searcher = new IndexSearcher(indexReader);
112         FacetsCollector fc = new FacetsCollector();
113         searcher.search(new MatchAllDocsQuery(), fc);
114         Facets facets = new FastTaxonomyFacetCounts(taxoReader, config, fc);
115         assertEquals(1, facets.getSpecificValue("A", Integer.toString(id, 16)).intValue());
116         
117         DrillDownQuery drillDown = new DrillDownQuery(config);
118         drillDown.add("A", Integer.toString(id, 16));
119         TopDocs docs = searcher.search(drillDown, 10);
120         assertEquals(1, docs.totalHits);
121       }
122       return null;
123     }
124     
125     @Override
126     public void close() throws IOException {
127       IOUtils.close(indexReader, taxoReader);
128     }
129   }
130   
131   private Directory publishIndexDir, publishTaxoDir;
132   private MockDirectoryWrapper handlerIndexDir, handlerTaxoDir;
133   private Replicator replicator;
134   private SourceDirectoryFactory sourceDirFactory;
135   private ReplicationClient client;
136   private ReplicationHandler handler;
137   private IndexWriter publishIndexWriter;
138   private SnapshotDirectoryTaxonomyWriter publishTaxoWriter;
139   private FacetsConfig config;
140   private IndexAndTaxonomyReadyCallback callback;
141   private Path clientWorkDir;
142   
143   private static final String VERSION_ID = "version";
144   
145   private void assertHandlerRevision(int expectedID, Directory dir) throws IOException {
146     // loop as long as client is alive. test-framework will terminate us if
147     // there's a serious bug, e.g. client doesn't really update. otherwise,
148     // introducing timeouts is not good, can easily lead to false positives.
149     while (client.isUpdateThreadAlive()) {
150       // give client a chance to update
151       try {
152         Thread.sleep(100);
153       } catch (InterruptedException e) {
154         throw new ThreadInterruptedException(e);
155       }
156       
157       try {
158         DirectoryReader reader = DirectoryReader.open(dir);
159         try {
160           int handlerID = Integer.parseInt(reader.getIndexCommit().getUserData().get(VERSION_ID), 16);
161           if (expectedID == handlerID) {
162             return;
163           }
164         } finally {
165           reader.close();
166         }
167       } catch (Exception e) {
168         // we can hit IndexNotFoundException or e.g. EOFException (on
169         // segments_N) because it is being copied at the same time it is read by
170         // DirectoryReader.open().
171       }
172     }
173   }
174   
175   private Revision createRevision(final int id) throws IOException {
176     publishIndexWriter.addDocument(newDocument(publishTaxoWriter, id));
177     publishIndexWriter.setCommitData(new HashMap<String, String>() {{
178       put(VERSION_ID, Integer.toString(id, 16));
179     }});
180     publishIndexWriter.commit();
181     publishTaxoWriter.commit();
182     return new IndexAndTaxonomyRevision(publishIndexWriter, publishTaxoWriter);
183   }
184   
185   private Document newDocument(TaxonomyWriter taxoWriter, int id) throws IOException {
186     Document doc = new Document();
187     doc.add(new FacetField("A", Integer.toString(id, 16)));
188     return config.build(taxoWriter, doc);
189   }
190   
191   @Override
192   @Before
193   public void setUp() throws Exception {
194     super.setUp();
195     publishIndexDir = newDirectory();
196     publishTaxoDir = newDirectory();
197     handlerIndexDir = newMockDirectory();
198     handlerTaxoDir = newMockDirectory();
199     clientWorkDir = createTempDir("replicationClientTest");
200     sourceDirFactory = new PerSessionDirectoryFactory(clientWorkDir);
201     replicator = new LocalReplicator();
202     callback = new IndexAndTaxonomyReadyCallback(handlerIndexDir, handlerTaxoDir);
203     handler = new IndexAndTaxonomyReplicationHandler(handlerIndexDir, handlerTaxoDir, callback);
204     client = new ReplicationClient(replicator, handler, sourceDirFactory);
205     
206     IndexWriterConfig conf = newIndexWriterConfig(null);
207     conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy()));
208     publishIndexWriter = new IndexWriter(publishIndexDir, conf);
209     publishTaxoWriter = new SnapshotDirectoryTaxonomyWriter(publishTaxoDir);
210     config = new FacetsConfig();
211     config.setHierarchical("A", true);
212   }
213   
214   @After
215   @Override
216   public void tearDown() throws Exception {
217     publishIndexWriter.close();
218     IOUtils.close(client, callback, publishTaxoWriter, replicator, publishIndexDir, publishTaxoDir,
219             handlerIndexDir, handlerTaxoDir);
220     super.tearDown();
221   }
222   
223   @Test
224   public void testNoUpdateThread() throws Exception {
225     assertNull("no version expected at start", handler.currentVersion());
226     
227     // Callback validates the replicated index
228     replicator.publish(createRevision(1));
229     client.updateNow();
230     
231     // make sure updating twice, when in fact there's nothing to update, works
232     client.updateNow();
233     
234     replicator.publish(createRevision(2));
235     client.updateNow();
236     
237     // Publish two revisions without update, handler should be upgraded to latest
238     replicator.publish(createRevision(3));
239     replicator.publish(createRevision(4));
240     client.updateNow();
241   }
242   
243   @Test
244   public void testRestart() throws Exception {
245     replicator.publish(createRevision(1));
246     client.updateNow();
247     
248     replicator.publish(createRevision(2));
249     client.updateNow();
250     
251     client.stopUpdateThread();
252     client.close();
253     client = new ReplicationClient(replicator, handler, sourceDirFactory);
254     
255     // Publish two revisions without update, handler should be upgraded to latest
256     replicator.publish(createRevision(3));
257     replicator.publish(createRevision(4));
258     client.updateNow();
259   }
260   
261   @Test
262   public void testUpdateThread() throws Exception {
263     client.startUpdateThread(10, "indexTaxo");
264     
265     replicator.publish(createRevision(1));
266     assertHandlerRevision(1, handlerIndexDir);
267     
268     replicator.publish(createRevision(2));
269     assertHandlerRevision(2, handlerIndexDir);
270     
271     // Publish two revisions without update, handler should be upgraded to latest
272     replicator.publish(createRevision(3));
273     replicator.publish(createRevision(4));
274     assertHandlerRevision(4, handlerIndexDir);
275   }
276   
277   @Test
278   public void testRecreateTaxonomy() throws Exception {
279     replicator.publish(createRevision(1));
280     client.updateNow();
281     
282     // recreate index and taxonomy
283     Directory newTaxo = newDirectory();
284     new DirectoryTaxonomyWriter(newTaxo).close();
285     publishTaxoWriter.replaceTaxonomy(newTaxo);
286     publishIndexWriter.deleteAll();
287     replicator.publish(createRevision(2));
288     
289     client.updateNow();
290     newTaxo.close();
291   }
292 
293   /*
294    * This test verifies that the client and handler do not end up in a corrupt
295    * index if exceptions are thrown at any point during replication. Either when
296    * a client copies files from the server to the temporary space, or when the
297    * handler copies them to the index directory.
298    */
299   @Test
300   public void testConsistencyOnExceptions() throws Exception {
301     // so the handler's index isn't empty
302     replicator.publish(createRevision(1));
303     client.updateNow();
304     client.close();
305     callback.close();
306 
307     // Replicator violates write-once policy. It may be that the
308     // handler copies files to the index dir, then fails to copy a
309     // file and reverts the copy operation. On the next attempt, it
310     // will copy the same file again. There is nothing wrong with this
311     // in a real system, but it does violate write-once, and MDW
312     // doesn't like it. Disabling it means that we won't catch cases
313     // where the handler overwrites an existing index file, but
314     // there's nothing currently we can do about it, unless we don't
315     // use MDW.
316     handlerIndexDir.setPreventDoubleWrite(false);
317     handlerTaxoDir.setPreventDoubleWrite(false);
318 
319     // wrap sourceDirFactory to return a MockDirWrapper so we can simulate errors
320     final SourceDirectoryFactory in = sourceDirFactory;
321     final AtomicInteger failures = new AtomicInteger(atLeast(10));
322     sourceDirFactory = new SourceDirectoryFactory() {
323       
324       private long clientMaxSize = 100, handlerIndexMaxSize = 100, handlerTaxoMaxSize = 100;
325       private double clientExRate = 1.0, handlerIndexExRate = 1.0, handlerTaxoExRate = 1.0;
326       
327       @Override
328       public void cleanupSession(String sessionID) throws IOException {
329         in.cleanupSession(sessionID);
330       }
331       
332       @SuppressWarnings("synthetic-access")
333       @Override
334       public Directory getDirectory(String sessionID, String source) throws IOException {
335         Directory dir = in.getDirectory(sessionID, source);
336         if (random().nextBoolean() && failures.get() > 0) { // client should fail, return wrapped dir
337           MockDirectoryWrapper mdw = new MockDirectoryWrapper(random(), dir);
338           mdw.setRandomIOExceptionRateOnOpen(clientExRate);
339           mdw.setMaxSizeInBytes(clientMaxSize);
340           mdw.setRandomIOExceptionRate(clientExRate);
341           mdw.setCheckIndexOnClose(false);
342           clientMaxSize *= 2;
343           clientExRate /= 2;
344           return mdw;
345         }
346         
347         if (failures.get() > 0 && random().nextBoolean()) { // handler should fail
348           if (random().nextBoolean()) { // index dir fail
349             handlerIndexDir.setMaxSizeInBytes(handlerIndexMaxSize);
350             handlerIndexDir.setRandomIOExceptionRate(handlerIndexExRate);
351             handlerIndexDir.setRandomIOExceptionRateOnOpen(handlerIndexExRate);
352             handlerIndexMaxSize *= 2;
353             handlerIndexExRate /= 2;
354           } else { // taxo dir fail
355             handlerTaxoDir.setMaxSizeInBytes(handlerTaxoMaxSize);
356             handlerTaxoDir.setRandomIOExceptionRate(handlerTaxoExRate);
357             handlerTaxoDir.setRandomIOExceptionRateOnOpen(handlerTaxoExRate);
358             handlerTaxoDir.setCheckIndexOnClose(false);
359             handlerTaxoMaxSize *= 2;
360             handlerTaxoExRate /= 2;
361           }
362         } else {
363           // disable all errors
364           handlerIndexDir.setMaxSizeInBytes(0);
365           handlerIndexDir.setRandomIOExceptionRate(0.0);
366           handlerIndexDir.setRandomIOExceptionRateOnOpen(0.0);
367           handlerTaxoDir.setMaxSizeInBytes(0);
368           handlerTaxoDir.setRandomIOExceptionRate(0.0);
369           handlerTaxoDir.setRandomIOExceptionRateOnOpen(0.0);
370         }
371 
372         return dir;
373       }
374     };
375     
376     handler = new IndexAndTaxonomyReplicationHandler(handlerIndexDir, handlerTaxoDir, new Callable<Boolean>() {
377       @Override
378       public Boolean call() throws Exception {
379         if (random().nextDouble() < 0.2 && failures.get() > 0) {
380           throw new RuntimeException("random exception from callback");
381         }
382         return null;
383       }
384     });
385 
386     final AtomicBoolean failed = new AtomicBoolean();
387 
388     // wrap handleUpdateException so we can act on the thrown exception
389     client = new ReplicationClient(replicator, handler, sourceDirFactory) {
390       @SuppressWarnings("synthetic-access")
391       @Override
392       protected void handleUpdateException(Throwable t) {
393         if (t instanceof IOException) {
394           try {
395             if (VERBOSE) {
396               System.out.println("hit exception during update: " + t);
397               t.printStackTrace(System.out);
398             }
399 
400             // test that the index can be read and also some basic statistics
401             DirectoryReader reader = DirectoryReader.open(handlerIndexDir.getDelegate());
402             try {
403               int numDocs = reader.numDocs();
404               int version = Integer.parseInt(reader.getIndexCommit().getUserData().get(VERSION_ID), 16);
405               assertEquals(numDocs, version);
406             } finally {
407               reader.close();
408             }
409             // verify index is fully consistent
410             TestUtil.checkIndex(handlerIndexDir.getDelegate());
411             
412             // verify taxonomy index is fully consistent (since we only add one
413             // category to all documents, there's nothing much more to validate.
414             ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
415             CheckIndex.Status indexStatus = null;
416 
417             try (CheckIndex checker = new CheckIndex(handlerTaxoDir.getDelegate())) {
418               checker.setFailFast(true);
419               checker.setInfoStream(new PrintStream(bos, false, IOUtils.UTF_8), false);
420               try {
421                 indexStatus = checker.checkIndex(null);
422               } catch (IOException | RuntimeException ioe) {
423                 // ok: we fallback below
424               }
425             }
426 
427             if (indexStatus == null || indexStatus.clean == false) {
428 
429               // Because segments file for taxo index is replicated after
430               // main index's segments file, if there's an error while replicating
431               // main index's segments file and if virus checker prevents
432               // deletion of taxo index's segments file, it can look like corruption.
433               // But it should be "false" meaning if we remove the latest segments
434               // file then the index is intact.  It's like pulling a hideous
435               // looking rock out of the ground, but then you pull the cruft
436               // off the outside of it and discover it's actually a beautiful
437               // diamond:
438               String segmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(handlerTaxoDir);
439               assertTrue(handlerTaxoDir.didTryToDelete(segmentsFileName));
440               handlerTaxoDir.getDelegate().deleteFile(segmentsFileName);
441               TestUtil.checkIndex(handlerTaxoDir.getDelegate());
442             }
443           } catch (IOException e) {
444             failed.set(true);
445             throw new RuntimeException(e);
446           } catch (RuntimeException e) {
447             failed.set(true);
448             throw e;
449           } finally {
450             // count-down number of failures
451             failures.decrementAndGet();
452             assert failures.get() >= 0 : "handler failed too many times: " + failures.get();
453             if (VERBOSE) {
454               if (failures.get() == 0) {
455                 System.out.println("no more failures expected");
456               } else {
457                 System.out.println("num failures left: " + failures.get());
458               }
459             }
460           }
461         } else {
462           failed.set(true);
463           if (t instanceof RuntimeException) {
464             throw (RuntimeException) t;
465           }
466           throw new RuntimeException(t);
467         }
468       }
469     };
470     
471     client.startUpdateThread(10, "indexAndTaxo");
472     
473     final Directory baseHandlerIndexDir = handlerIndexDir.getDelegate();
474     int numRevisions = atLeast(20) + 2;
475     for (int i = 2; i < numRevisions && failed.get() == false; i++) {
476       replicator.publish(createRevision(i));
477       assertHandlerRevision(i, baseHandlerIndexDir);
478     }
479 
480     // disable errors -- maybe randomness didn't exhaust all allowed failures,
481     // and we don't want e.g. CheckIndex to hit false errors. 
482     handlerIndexDir.setMaxSizeInBytes(0);
483     handlerIndexDir.setRandomIOExceptionRate(0.0);
484     handlerIndexDir.setRandomIOExceptionRateOnOpen(0.0);
485     handlerTaxoDir.setMaxSizeInBytes(0);
486     handlerTaxoDir.setRandomIOExceptionRate(0.0);
487     handlerTaxoDir.setRandomIOExceptionRateOnOpen(0.0);
488   }
489   
490 }